RxJava হল একটি শক্তিশালী টুল যা asynchronous এবং event-driven প্রোগ্রামিং সহজ করে। তবে এটি সঠিকভাবে ব্যবহার না করলে কোড জটিল এবং maintenance-এ কঠিন হতে পারে। এখানে উদাহরণ এবং Best Practices নিয়ে আলোচনা করা হলো।
উদাহরণ: RxJava-এর ব্যবহার
1. Basic Example
import io.reactivex.Observable;
public class RxJavaExample {
public static void main(String[] args) {
Observable<String> observable = Observable.just("RxJava", "is", "powerful!");
observable
.map(String::toUpperCase)
.subscribe(
item -> System.out.println("Received: " + item), // onNext
throwable -> System.out.println("Error: " + throwable), // onError
() -> System.out.println("Done!") // onComplete
);
}
}
Output:
Received: RXJAVA
Received: IS
Received: POWERFUL!
Done!
2. Using Schedulers for Background Work
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class SchedulerExample {
public static void main(String[] args) throws InterruptedException {
Observable.range(1, 5)
.subscribeOn(Schedulers.io()) // Background thread for data emission
.observeOn(Schedulers.computation()) // Computation thread for processing
.map(i -> i * i)
.subscribe(
item -> System.out.println("Processed: " + item + " on " + Thread.currentThread().getName()),
throwable -> System.out.println("Error: " + throwable),
() -> System.out.println("Processing Complete!")
);
Thread.sleep(1000); // Wait for background threads to complete
}
}
Output (Thread Names Vary):
Processed: 1 on RxComputationThreadPool-1
Processed: 4 on RxComputationThreadPool-1
Processed: 9 on RxComputationThreadPool-1
Processed: 16 on RxComputationThreadPool-1
Processed: 25 on RxComputationThreadPool-1
Processing Complete!
3. Combining Observables with merge()
import io.reactivex.Observable;
public class MergeExample {
public static void main(String[] args) {
Observable<String> observable1 = Observable.just("A", "B", "C");
Observable<String> observable2 = Observable.just("1", "2", "3");
Observable.merge(observable1, observable2)
.subscribe(
item -> System.out.println("Received: " + item),
throwable -> System.out.println("Error: " + throwable),
() -> System.out.println("Merged Streams Completed!")
);
}
}
Output:
Received: A
Received: B
Received: C
Received: 1
Received: 2
Received: 3
Merged Streams Completed!
4. Error Handling with onErrorResumeNext
import io.reactivex.Observable;
public class ErrorHandlingExample {
public static void main(String[] args) {
Observable<Integer> observable = Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Exception("Something went wrong!"));
});
observable
.onErrorResumeNext(Observable.just(3, 4, 5)) // Fallback Observable
.subscribe(
item -> System.out.println("Received: " + item),
throwable -> System.out.println("Error: " + throwable),
() -> System.out.println("Stream Completed!")
);
}
}
Output:
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Stream Completed!
Best Practices
1. Use Proper Threading
- Use
Schedulersto offload heavy operations to background threads. - Avoid blocking the main thread in Android or UI-heavy applications.
Example:
observable
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe();
2. Avoid Memory Leaks
- Always dispose of subscriptions when they are no longer needed.
- Use
CompositeDisposableto manage multiple subscriptions.
Example:
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(
observable.subscribe(item -> System.out.println(item))
);
// Dispose when done
disposables.clear();
3. Error Handling
- Always handle errors using
onErrorResumeNext,onErrorReturn, orretry.
Example:
observable
.onErrorReturnItem("Fallback Value")
.subscribe(System.out::println);
4. Keep Streams Simple
- Avoid chaining too many operators in a single stream; break it down for readability.
5. Use Hot vs Cold Observables Correctly
- Use Cold Observables (default) when each subscriber should get a fresh data stream.
- Use Hot Observables when the stream should be shared across multiple subscribers.
Example (Hot Observable):
ConnectableObservable<Integer> hotObservable = Observable.range(1, 5).publish();
hotObservable.connect();
6. Backpressure Management
- Use Flowable for handling large or infinite streams to avoid
OutOfMemoryError.
Example:
Flowable.range(1, 1000)
.onBackpressureBuffer()
.observeOn(Schedulers.computation())
.subscribe(System.out::println);
RxJava একটি জটিল টুল, কিন্তু সঠিকভাবে ব্যবহার করলে এটি asynchronous প্রোগ্রামিং এবং event-driven সিস্টেমগুলিকে অত্যন্ত কার্যকর এবং maintainable করে তোলে।
Content added By
Read more